前一回,透過 Console 試用了一下 Nova Reel 生成素材。 接著我們需要繼續完成介面、API 傳遞邏輯、權限配置等設定。
import os, json, random, boto3, jwt
from datetime import datetime
from urllib.parse import unquote
# ---- 環境變數 ----
BUCKET_NAME = os.environ.get("BUCKET_NAME", "exsky-backup-media")
SECRET = os.environ.get("JWT_SECRET", "mysecret")
REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
MODEL_ID = os.environ.get("MODEL_ID", "amazon.nova-reel-v1:0")
TABLE_NAME = os.environ.get("JOBS_TABLE", "NovaReelJobs")
# ---- AWS Client ----
brt = boto3.client("bedrock-runtime", region_name=REGION)
dynamo = boto3.resource("dynamodb", region_name=REGION).Table(TABLE_NAME)
s3 = boto3.client("s3", region_name=REGION)
# ---- 共用回應函數 ----
def _cors_response(status, body):
    return {
        "statusCode": status,
        "headers": {
            "Access-Control-Allow-Origin": "https://vlog.nipapa.tw",
            "Access-Control-Allow-Methods": "GET,POST,OPTIONS",
            "Access-Control-Allow-Headers": "Authorization,Content-Type"
        },
        "body": json.dumps(body, ensure_ascii=False)
    }
# ---- Lambda Handler ----
def lambda_handler(event, context):
    method = event.get("requestContext", {}).get("http", {}).get("method", "")
    path = event.get("requestContext", {}).get("http", {}).get("path", "")
    # --- CORS ---
    if method == "OPTIONS":
        return _cors_response(200, {})
    # --- 驗證 JWT ---
    headers = event.get("headers", {}) or {}
    auth = headers.get("authorization") or headers.get("Authorization") or ""
    if not auth.startswith("Bearer "):
        return _cors_response(401, {"error": "Missing token"})
    token = auth.split(" ")[1]
    try:
        decoded = jwt.decode(token, SECRET, algorithms=["HS256"])
        username = decoded.get("username", "unknown")
    except Exception:
        return _cors_response(401, {"error": "Invalid token"})
    # --- 建立影片 (POST /reels) ---
    if method == "POST" and path.endswith("/reels"):
        try:
            body = json.loads(event.get("body", "{}"))
        except Exception:
            return _cors_response(400, {"error": "Invalid request body"})
        prompt = body.get("prompt", "A cinematic shot of Taipei skyline at sunset")
        duration = int(body.get("duration_seconds", 6))
        fps = int(body.get("fps", 24))
        dimension = body.get("dimension", "1280x720")
        seed = body.get("seed", random.randint(0, 2147483646))
        # 輸出路徑
        s3_output_uri = f"s3://{BUCKET_NAME}/{username}/reels/"
        model_input = {
            "taskType": "TEXT_VIDEO",
            "textToVideoParams": {"text": prompt},
            "videoGenerationConfig": {
                "fps": fps,
                "durationSeconds": duration,
                "dimension": dimension,
                "seed": seed,
            },
        }
        try:
            resp = brt.start_async_invoke(
                modelId=MODEL_ID,
                modelInput=model_input,
                outputDataConfig={"s3OutputDataConfig": {"s3Uri": s3_output_uri}},
            )
            job_arn = resp["invocationArn"]
            dynamo.put_item(
                Item={
                    "job_arn": job_arn,
                    "username": username,
                    "status": "InProgress",
                    "created_at": datetime.utcnow().isoformat(),
                    "prompt": prompt,
                    "s3_output": s3_output_uri,
                }
            )
        except Exception as e:
            # ✅ 把詳細錯誤回給前端,而不是 500
            return _cors_response(500, {"error": str(e)})
        return _cors_response(200, {"job_arn": job_arn, "status": "InProgress"})
    # --- 查詢影片狀態 (GET /reels/{job_arn}) ---
    if method == "GET" and "/reels/" in path:
        try:
            job_arn = unquote(event["pathParameters"]["job_arn"])
            job = brt.get_async_invoke(invocationArn=job_arn)
            status = job["status"]
            if status == "Completed":
                key = f"{username}/reels/output.mp4"
                presigned = s3.generate_presigned_url(
                    "get_object",
                    Params={"Bucket": BUCKET_NAME, "Key": key},
                    ExpiresIn=3600,
                )
                body = {
                    "status": "Completed",
                    "s3_uri": f"s3://{BUCKET_NAME}/{key}",
                    "presigned_url": presigned,
                }
            elif status == "Failed":
                body = {"status": "Failed", "message": job.get("failureMessage", "Unknown")}
            else:
                body = {"status": status}
        except Exception as e:
            return _cors_response(500, {"error": str(e)})
        return _cors_response(200, body)
    # --- 其他 Method ---
    return _cors_response(405, {"error": "Method not allowed"})